/*
 * Copyright 2021-2022 Open Kunlun Technology <https://www.openkunlun.io>
 */

package io.openkunlun.scaladsl.client

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import com.google.protobuf.ByteString
import io.openkunlun.scaladsl.serialization.{ DaprObjectSerialization, JsonSerialization }
import io.openkunlun.scaladsl.util.Strings
import io.openkunlun.scaladsl.v1.{ InvokeBindingRequest, InvokeBindingResponse }

import scala.concurrent.{ ExecutionContext, Future }

/**
 * @author ericxin.
 */
private object DaprBindingClient extends ExtensionId[DaprBindingClient] with ExtensionIdProvider {
  override def lookup: ExtensionId[DaprBindingClient] = DaprBindingClient
  override def createExtension(system: ExtendedActorSystem) = new DaprBindingClient(system)
}
private class DaprBindingClient(system: ExtendedActorSystem) extends Extension {
  private val serialization: DaprObjectSerialization = JsonSerialization(system)
  private val daprClient = DaprGrpcClient(system)

  def invokeBinding[T: Manifest](action: InvokeBindingAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[T] = {
    if (headers.nonEmpty) {
      var builder = daprClient.client.invokeBinding()
      headers.foreach(it => builder = builder.addHeader(it._1, it._2))
      for {
        response <- builder.invoke(buildRequest(action))
        result = deserializeInvokeBindingResponse[T](response)
      } yield result
    } else {
      for {
        response <- daprClient.client.invokeBinding(buildRequest(action))
        result = deserializeInvokeBindingResponse[T](response)
      } yield result
    }
  }

  private def buildRequest(action: InvokeBindingAction)(implicit serialization: DaprObjectSerialization) = {
    if (Strings.isEmpty(action.name)) {
      throw new BindNameIsEmptyException()
    }
    InvokeBindingRequest(
      action.name,
      ByteString.copyFrom(serialization.serialize(action.data)),
      action.metadata,
      action.operation.getOrElse(""))
  }

  private def deserializeInvokeBindingResponse[T: Manifest](response: InvokeBindingResponse)(implicit serialization: DaprObjectSerialization): T = {
    serialization.deserialize[T](response.data.toByteArray)
  }
}
